package com.jscloud.bigdata.hbase.mr3;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 *  * 需求【重点，会在实际项目中用到】
 *   * 通过bulkload的方式批量加载数据到HBase表中
 *   * 将我们hdfs上面的这个路径/hbase/input/user.txt的数据文件，转换成HFile格式，然后load到myuser2这张表里面去
 *
 * * 知识点描述
 *   - 加载数据到HBase当中去的方式多种多样，我们可以使用HBase的javaAPI或者使用sqoop将我们的数据写入或者导入到HBase当中去，但是这些方式不是最佳的，因为在导入的过程中占用Region资源导致效率低下
 *   - 我们也可以通过MR的程序，将我们的数据直接转换成HBase的最终存储格式HFile，然后直接load数据到HBase当中去即可
 *
 *好处
 * - 导入过程不占用Region资源
 * - 能快速导入海量的数据
 * - 节省内存
 *
 */
public class HBaseBulkLoad extends Configured implements Tool {
        public static void main(String[] args) throws Exception {
                Configuration configuration = HBaseConfiguration.create();
                //设定zk集群
                configuration.set("hbase.zookeeper.quorum", "bigdata01:2181,bigdata02:2181,bigdata03:2181");

                int run = ToolRunner.run(configuration, new HBaseBulkLoad(), args);
                System.exit(run);
        }
        @Override
        public int run(String[] args) throws Exception {
                Configuration conf = super.getConf();
                Job job = Job.getInstance(conf);
                job.setJarByClass(HBaseBulkLoad.class);

                FileInputFormat.addInputPath(job, new Path("hdfs://bigdata01:8020/hbase/input"));
                job.setMapperClass(BulkLoadMapper.class);
                job.setMapOutputKeyClass(ImmutableBytesWritable.class);
                job.setMapOutputValueClass(Put.class);

                Connection connection = ConnectionFactory.createConnection(conf);
                Table table = connection.getTable(TableName.valueOf("myuser2"));

                //使MR可以向myuser2表中，增量增加数据
                HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("myuser2")));
                //数据写回到HDFS，写成HFile -> 所以指定输出格式为HFileOutputFormat2
                job.setOutputFormatClass(HFileOutputFormat2.class);
                HFileOutputFormat2.setOutputPath(job, new Path("hdfs://bigdata01:8020/hbase/out_hfile"));

                //开始执行
                boolean b = job.waitForCompletion(true);

                return b? 0: 1;
        }
}
